#!/usr/bin/env python
# -*- coding: UTF-8 -*-
'''
@Project ：henan_sumo
@File ：main_send.py
@IDE ：PyCharm
@Author ：xinyingjie
@Date ：2025/4/16 17:48
'''
import threading

from config.config_hebing import KAFKA_HOST, KAFKA_E1_TOPIC
from loguru import logger
import time
import json
from kafka import KafkaConsumer, KafkaProducer
# from utils_t import time_str

kafka_producer = KafkaProducer(bootstrap_servers=KAFKA_HOST, key_serializer=str.encode,
                               value_serializer=lambda x: json.dumps(x).encode('utf-8'))
file_path = r"F:\辛英杰\proj2025\河南数字换转型\kafkatool\export\20250506-120000_20250506-140000.json"
before_time = None
with open(file_path, 'r', encoding='utf-8') as f:
    while True:
        line = f.readline().strip()
        if line:
            data = json.loads(line)
            targets = data['targets']
            # for target in targets:
                # if target['uuid'] == '21-400010761':
                #     print(target)
            timestamp = data['timestamp']
            if before_time:
                s_t = (timestamp - before_time) / 1000
                if s_t > 0:
                    # print(s_t)
                    kafka_producer.send(KAFKA_E1_TOPIC, value=data,
                                        key=str(timestamp))
                    time.sleep(s_t)
            before_time = timestamp
        else:
            break
